{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "46cff21c-b285-4390-b2df-abd54dfb221d",
   "metadata": {},
   "outputs": [],
   "source": [
    "pip install confluent-kafka"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "71a6fecc-29e6-4bb5-aad3-3cd5cd7a4697",
   "metadata": {},
   "outputs": [],
   "source": [
    "from confluent_kafka import Producer, Consumer\n",
    "import random\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c0aa2aee-cd4e-4363-a10d-08cc0473e07b",
   "metadata": {},
   "outputs": [],
   "source": [
    "config = {'bootstrap.servers': '[CCloud Server Endpoint]',\n",
    "          'security.protocol': 'SASL_SSL',\n",
    "          'sasl.mechanisms': 'PLAIN',\n",
    "          'sasl.username': '[CCloud API Key]',\n",
    "          'sasl.password': '[CCloud API Secret]'\n",
    "         }"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8c67cad4-ba6a-4a85-80dd-1ce8b9038a74",
   "metadata": {},
   "outputs": [],
   "source": [
    "pizzas = {1: \"Prairie\", 2: \"Roundup\", 3: \"Stampede\", 4: \"Bronco\", 5: \"Sweet Swine\", 6: \"Texan Taco\"}\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b5eded67-8cf1-4559-976a-f9333028884e",
   "metadata": {},
   "outputs": [],
   "source": [
    "producer = Producer(config)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dbdba5fa-1ee1-49b4-a36a-4654dc9a73d9",
   "metadata": {},
   "outputs": [],
   "source": [
    "def producer_callback(err, event):\n",
    "    if err:\n",
    "        print(f\"ERROR: {err}\")\n",
    "    else:\n",
    "        val = event.value().decode('utf8')\n",
    "        print(f'{val} sent to topic {event.topic()}.')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c30b9ebe-5d46-4f11-aa67-738151d55d1a",
   "metadata": {},
   "outputs": [],
   "source": [
    "for n in range(10):\n",
    "    pizza = pizzas.get(random.randint(1, 6))\n",
    "    producer.produce('pizzas', value=pizza, on_delivery=producer_callback)\n",
    "producer.flush()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3caab0b3-c1aa-4fe9-b091-8ac272bc6b49",
   "metadata": {},
   "outputs": [],
   "source": [
    "config['group.id'] = 'notebook_tutorial'\n",
    "config['auto.offset.reset'] = 'earliest'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b378c2e9-a906-466e-93f2-99f6aa83c832",
   "metadata": {},
   "outputs": [],
   "source": [
    "consumer = Consumer(config)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7640ad6e-61aa-4f8a-ac29-659ca5d3f635",
   "metadata": {},
   "outputs": [],
   "source": [
    "consumer.subscribe(['pizzas'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c1a81c29-26e8-4fc0-b14a-81ac3f14cd74",
   "metadata": {},
   "outputs": [],
   "source": [
    "for n in range(10):\n",
    "    event = consumer.poll(1.0)\n",
    "    if event is None:\n",
    "        print(f'Waiting…')\n",
    "    elif event.error():\n",
    "        print(f'An error has occurred: {event.error()}')\n",
    "    else:\n",
    "        val = event.value().decode('utf8')\n",
    "        print(f'We received a delicious {val} pizza!')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b777fd2c-14d1-45bf-a9ca-db085b598989",
   "metadata": {},
   "outputs": [],
   "source": [
    "consumer.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d35ec447-9318-42c2-9913-4dcf7c241597",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
